package org.niit.service

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.niit.bean.Answer

class EDUDataService {

  def dataAnalysis(answer:DStream[Answer]): Unit ={

    hotQuestionTop10(answer)
    gradeQuestionTop10(answer)
    hotQuestionWithSubjectTop10(answer)
    studentMinScoreWithQuestionTop10(answer)

  }
  //需求一：统计top10热点题
  private def hotQuestionTop10(answer:DStream[Answer]): Unit ={
    //将结果再控制台打印即可
    // k-v (题目，1)
    val mapDS = answer.map(data => {
      (data.question_id, 1)
    })
    // (题目，1)  (题目，1)  (题目，1) ==>  (题目，3)
    val reduceData = mapDS.reduceByKey(_ + _)

    //SparkStreaming  先sort再输出   先输出再排序
    //输出 print  foreachRDD
    reduceData.foreachRDD(rdd=>{
      //第一个下划线：(题目，3)  _2:次数 元组索引值是从1开始 默认是降序
      val sortRDD: RDD[(String, Int)] = rdd.sortBy(_._2, false)
      val top10: Array[(String, Int)] = sortRDD.take(10)
      println("--------统计top10热点题----------")
      top10.foreach(println)

    })

  }

  //需求二：统计top10答题活跃年级
  private def gradeQuestionTop10(answer:DStream[Answer]): Unit ={
    //将结果再控制台打印即可
    val mapDS = answer.map(data => {
      (data.grade_id, 1)
    })
    val reduceDS = mapDS.reduceByKey(_ + _)

    reduceDS.foreachRDD(rdd=>{
      val sortRDD = rdd.sortBy(_._2, false)
      val top10 =  sortRDD.take(10)
      println("--------统计top10答题活跃年级----------")
      top10.foreach(println)
    })

  }

  //需求三：统计top10热点题并带上所属科目
  private def hotQuestionWithSubjectTop10(answer:DStream[Answer]): Unit ={
    //将结果再控制台打印即可
    //(题目，科目 )，次数
    val mapDS = answer.map(data => {
      ((data.question_id, data.subject_id), 1)
      //(data.question_id,(data.subject_id,1))

    })
    val reduceDS = mapDS.reduceByKey(_ + _)
    reduceDS.foreachRDD(rdd=>{
      val sortRDD = rdd.sortBy(_._2, false)
      val top10 =  sortRDD.take(10)
      println("--------统计top10热点题并带上所属科目----------")
      top10.foreach(println)
    })


  }

  //需求四：统计Top10学生的得分最低的题目，并带上所属题
  private def studentMinScoreWithQuestionTop10(answer:DStream[Answer]): Unit ={
    //将结果再控制台打印即可
    //（学生，题目 ），分数
    val mapDS =  answer.map(data=>{
      ( (data.student_id,data.question_id),data.score )
    })

    mapDS.foreachRDD(rdd=>{
      val sortRDD =  rdd.sortBy(_._2,true)
      val top10 = sortRDD.take(10)
      println("--------统计Top10学生的得分最低的题目，并带上所属题----------")
      top10.foreach(println)
    })
  }

  //需求五：实时统计每个科目Top10热点题
  private def subjectWithHostQuestionTop10(answer:DStream[Answer]): Unit ={
    //(科目,(题目1,次数),(题目2,次数),(题目3,次数),…………(题目10,次数))
    val mapDS =  answer.map(data=>{
      //( (科目，题目),次数 )
      ((data.subject_id,data.question_id),1)
    })
    ///( (科目，题目),1 ) ( (科目，题目),1 ) ==> ( (科目，题目),2 )  ( (语文，文言文),2 )  ( (语文，阅读理解),3 )
    val reduceDS = mapDS.reduceByKey(_+_)
    //( (科目，题目),1 )   ==>  (科目,(题目1,次数) )
    val newMapDS = reduceDS.map{ // (语文, (文言文,2)   ( 语文,(阅读理解,3))
      case ((subject,question),count)=>{
        (subject,(question,count))
      }
    }
    val groupDS =  newMapDS.groupByKey()//(语文,[ (文言文,2),(阅读理解,3) ])

    groupDS.foreachRDD(rdd=>{
      val value =  rdd.mapValues(iter=>{ //iter === [ (文言文,2),(阅读理解,3) ]
        val sort =  iter.toList.sortBy(_._2).reverse
        sort.take(10)
      })
      println("--------实时统计每个科目Top10热点题----------")
      value.collect().foreach(println)
    })

  }
  //需求六：统计每个学生的得分最低的前10道题，并带上所属题
  private def studentWithMinScoreQuestionTop10(answer:DStream[Answer]): Unit ={
    //(学生,(题目1,成绩),(题目2,成绩),(题目3,成绩),…………(题目10,成绩))
    val mapDS = answer.map(data=>{
      (data.student_id,(data.question_id, data.score))
    })

    val groupDS = mapDS.groupByKey()

    groupDS.foreachRDD(rdd=>{
      val resRDD =  rdd.mapValues(iter=>{
        val sort = iter.toList.sortBy(_._2)
        sort.take(10)
      })
      println("--------统计每个学生的得分最低的前10道题，并带上所属题----------")
      resRDD.collect().foreach(println)
    })

  }

}
